/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.s2graph.core

import java.util.Base64

import com.google.protobuf.ByteString
import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{Json, _}

import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.collection.immutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.concurrent.Future
import scala.util.Random

object PostProcess {


  type EDGE_VALUES = Map[String, JsValue]
  type ORDER_BY_VALUES =  (Any, Any, Any, Any)
  type RAW_EDGE = (EDGE_VALUES, Double, ORDER_BY_VALUES)
  type GROUP_BY_KEY = Map[String, JsValue]

  /**
   * Result Entity score field name
   */
  val emptyDegrees = Seq.empty[JsValue]
  val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isEmpty" -> true, "rpcFail" -> 0)

  def badRequestResults(ex: => Exception) = ex match {
    case ex: BadQueryException => Json.obj("message" -> ex.msg)
    case _ => Json.obj("message" -> ex.getMessage)
  }

  def s2EdgeParent(graph: S2GraphLike,
                   queryOption: QueryOption,
                   parentEdges: Seq[EdgeWithScore]): JsValue = {
    if (parentEdges.isEmpty) JsNull
    else {
      val ancestors = for {
        current <- parentEdges
        parents = s2EdgeParent(graph, queryOption, current.edge.getParentEdges()) if parents != JsNull
      } yield {
          val s2Edge = current.edge.getOriginalEdgeOpt().getOrElse(current.edge)
          s2EdgeToJsValue(queryOption, current.copy(edge = s2Edge), false, parents = parents, checkSelectColumns = true)
        }
      Json.toJson(ancestors)
    }
  }

  def s2EdgeToJsValue(queryOption: QueryOption,
                      edgeWithScore: EdgeWithScore,
                      isDegree: Boolean = false,
                      parents: JsValue = JsNull,
                      checkSelectColumns: Boolean = false): JsValue = {
    //    val builder = immutable.Map.newBuilder[String, JsValue]
    val builder = ArrayBuffer.empty[(String, JsValue)]
    val s2Edge = edgeWithScore.edge
    val score = edgeWithScore.score
    val label = edgeWithScore.label
    if (isDegree) {
      builder += ("from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
      builder += ("label" -> anyValToJsValue(label.label).get)
      builder += ("direction" -> anyValToJsValue(s2Edge.getDirection()).get)
      builder += (LabelMeta.degree.name -> anyValToJsValue(s2Edge.propertyValueInner(LabelMeta.degree).innerVal.value).get)
      JsObject(builder)
    } else {
      if (queryOption.withScore) builder += ("score" -> anyValToJsValue(score).get)

      if (queryOption.selectColumns.isEmpty) {
        builder += ("from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
        builder += ("to" -> anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get)
        builder += ("label" -> anyValToJsValue(label.label).get)

        val innerProps = ArrayBuffer.empty[(String, JsValue)]
        for {
          (labelMeta, v) <- edgeWithScore.edge.propertyValues()
          jsValue <- anyValToJsValue(v.innerVal.value)
        } {
          innerProps += (labelMeta.name -> jsValue)
        }


        builder += ("props" -> JsObject(innerProps))
        builder += ("direction" -> anyValToJsValue(s2Edge.getDirection()).get)
        builder += ("timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get)
        builder += ("_timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get) // backward compatibility
        if (parents != JsNull) builder += ("parents" -> parents)
        //          Json.toJson(builder.result())
        JsObject(builder)
      } else {
        queryOption.selectColumnsMap.foreach { case (columnName, _) =>
          columnName match {
            case "from" => builder += ("from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
            case "_from" => builder += ("_from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get)
            case "to" => builder += ("to" -> anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get)
            case "_to" => builder += ("_to" -> anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get)
            case "label" => builder += ("label" -> anyValToJsValue(label.label).get)
            case "direction" => builder += ("direction" -> anyValToJsValue(s2Edge.getDirection()).get)
            case "timestamp" => builder += ("timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get)
            case "_timestamp" => builder += ("_timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get)
            case _ => // should not happen

          }
        }
        val innerProps = ArrayBuffer.empty[(String, JsValue)]
        for {
          (selectColumnName, _) <- queryOption.selectColumnsMap
          labelMeta <- label.metaPropsInvMap.get(selectColumnName)
          innerValWithTs = edgeWithScore.edge.propertyValueInner(labelMeta)
          jsValue <- anyValToJsValue(innerValWithTs.innerVal.value)
        } {
          innerProps += (labelMeta.name -> jsValue)
        }

        builder += ("props" -> JsObject(innerProps))
        if (parents != JsNull) builder += ("parents" -> parents)
        JsObject(builder)
      }
    }
  }

  def s2VertexToJson(s2Vertex: S2VertexLike): Option[JsValue] = {
    val props = for {
      (_, property) <- s2Vertex.props
      jsVal <- anyValToJsValue(property.value)
    } yield property.columnMeta.name -> jsVal

    for {
      id <- anyValToJsValue(s2Vertex.innerIdVal)
    } yield {
      Json.obj(
        "serviceName" -> s2Vertex.serviceName,
        "columnName" -> s2Vertex.columnName,
        "id" -> id,
        "props" -> Json.toJson(props),
        "timestamp" -> s2Vertex.ts
      )
    }
  }

  def verticesToJson(s2Vertices: Seq[S2VertexLike]): JsValue =
    Json.toJson(s2Vertices.flatMap(s2VertexToJson(_)))

  def withOptionalFields(queryOption: QueryOption,
                         size: Int,
                         degrees: Seq[JsValue],
                         results: Seq[JsValue],
                         failCount: Int = 0,
                         cursors: => JsValue,
                         nextQuery: => Option[JsValue]): JsValue = {

    val kvs = new ArrayBuffer[(String, JsValue)]()


    kvs.append("size" -> JsNumber(size))
    kvs.append("degrees" -> JsArray(degrees))
    kvs.append("results" -> JsArray(results))

    if (queryOption.impIdOpt.isDefined) kvs.append(Experiment.ImpressionKey -> JsString(queryOption.impIdOpt.get))

    JsObject(kvs)
  }

  def buildJsonWith(js: JsValue)(implicit fn: (String, JsValue) => JsValue): JsValue = js match {
    case JsObject(obj) => JsObject(obj.map { case (k, v) => k -> buildJsonWith(fn(k, v)) })
    case JsArray(arr) => JsArray(arr.map(buildJsonWith(_)))
    case _ => js
  }

  def toJson(orgQuery: Option[JsValue])(graph: S2GraphLike,
                                        queryOption: QueryOption,
                                        stepResult: StepResult): JsValue = {

    // [[cursor, cursor], [cursor]]
    lazy val cursors: Seq[Seq[String]] = stepResult.accumulatedCursors.map { stepCursors =>
      stepCursors.map { cursor => new String(Base64.getEncoder.encode(cursor)) }
    }

    lazy val cursorJson: JsValue = Json.toJson(cursors)

    // build nextQuery with (original query + cursors)
    lazy val nextQuery: Option[JsValue] = {
      if (cursors.exists { stepCursors => stepCursors.exists(_ != "") }) {
        val cursorIter = cursors.iterator

        orgQuery.map { query =>
          buildJsonWith(query) { (key, js) =>
            if (key == "step") {
              val currentCursor = cursorIter.next
              val res = js.as[Seq[JsObject]].toStream.zip(currentCursor).filterNot(_._2 == "").map { case (obj, cursor) =>
                val label = (obj \ "label").as[String]
                if (Label.findByName(label).get.schemaVersion == "v4") obj + ("cursor" -> JsString(cursor))
                else {
                  val limit = (obj \ "limit").asOpt[Int].getOrElse(RequestParser.defaultLimit)
                  val offset = (obj \ "offset").asOpt[Int].getOrElse(0)
                  obj + ("offset" -> JsNumber(offset + limit))
                }
              }

              JsArray(res)
            } else js
          }
        }
      } else Option(JsNull)
    }

    val limitOpt = queryOption.limitOpt
    val selectColumns = queryOption.selectColumnsMap
    val degrees =
      if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(queryOption, t, true, JsNull))
      else emptyDegrees

    if (queryOption.groupBy.keys.isEmpty) {
      // no group by specified on query.
      val results = if (limitOpt.isDefined) stepResult.edgeWithScores.take(limitOpt.get) else stepResult.edgeWithScores
      val ls = results.map { t =>
        val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.getParentEdges()) else JsNull

        s2EdgeToJsValue(queryOption, t, false, parents)
      }

      withOptionalFields(queryOption, ls.size, degrees, ls, stepResult.failCount, cursorJson, nextQuery)
    } else {

      val grouped = if (limitOpt.isDefined) stepResult.grouped.take(limitOpt.get) else stepResult.grouped
      val results =
        for {
          (groupByValues, (scoreSum, edges)) <- grouped
        } yield {
          val groupByKeyValues = queryOption.groupBy.keys.zip(groupByValues).map { case (k, valueOpt) =>
            k -> valueOpt.flatMap(anyValToJsValue).getOrElse(JsNull)
          }
          val groupByValuesJson = Json.toJson(groupByKeyValues.toMap)

          if (!queryOption.returnAgg) {
            Json.obj(
              "groupBy" -> groupByValuesJson,
              "scoreSum" -> scoreSum,
              "agg" -> Json.arr()
            )
          } else {
            val agg = edges.map { t =>
              val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.getParentEdges()) else JsNull
              s2EdgeToJsValue(queryOption, t, false, parents)
            }
            val aggJson = Json.toJson(agg)
            Json.obj(
              "groupBy" -> groupByValuesJson,
              "scoreSum" -> scoreSum,
              "agg" -> aggJson
            )
          }
        }

      withOptionalFields(queryOption, results.size, degrees, results, stepResult.failCount, cursorJson, nextQuery)
    }
  }

  def s2EdgePropsJsonString(edge: S2EdgeLike): String =
    Json.toJson(s2EdgePropsJson(edge)).toString()

  def s2VertexPropsJsonString(vertex: S2VertexLike): String =
    Json.toJson(s2VertexPropsJson(vertex)).toString()

  def s2EdgePropsJson(edge: S2EdgeLike): Map[String, JsValue] = {
    import scala.collection.JavaConverters._
    for {
      (k, v) <- edge.getPropsWithTs().asScala.toMap
      jsValue <- JSONParser.anyValToJsValue(v.innerVal.value)
    } yield (v.labelMeta.name -> jsValue)
  }

  def s2VertexPropsJson(vertex: S2VertexLike): Map[String, JsValue] = {
    import scala.collection.JavaConverters._
    for {
      (k, v) <- vertex.props.asScala.toMap
      jsValue <- JSONParser.anyValToJsValue(v.innerVal.value)
    } yield (v.columnMeta.name -> jsValue)
  }
}